package com.abyss.transformation;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple1;

/**
 * 读取apache.log日志，统计有哪些ip访问了网站
 */
public class DistinctDemo {
    public static void main(String[] args) throws Exception {
        // 1. Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. Source
        DataSource<String> fileSource = env.readTextFile("/Users/abyss/Dev/toys/flink/H-flink-learn/src/main/resources/apache.log");

        // 3. 去重操作,首先先将数据转成元组
        MapOperator<String, Tuple1<String>> tuple1 = fileSource.map(new MapFunction<String, Tuple1<String>>() {
            @Override
            public Tuple1<String> map(String value) throws Exception {
                return Tuple1.of(value.split(" ")[0]);
            }
        });

        // 4. 去重操作, 调用distinct
        tuple1.distinct(0).print();
    }
}
